-
Notifications
You must be signed in to change notification settings - Fork 14
Execution working on all 22 TPCH queries #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…s' into robtandy/fix_execution_bug
gabotechs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good! left just some minor comments, but looks pretty much there
benchmarks/src/tpch/run.rs
Outdated
|
|
||
| #[structopt(long = "validate")] | ||
| validate: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about moving forward with @jayshrivastava's changes in #83 for validating TPCH correctness instead of this? it might be slightly better to ensure validation there because:
- It would be nice to touch this code as little as possible, as this is pretty much vendored code from upstream DataFusion, and if we decide to move this project there or upstream DataFusion decides to make their benchmarks crate public, it would be difficult to port because of conflicts
- We want to ensure TPCH correctness in the CI, so it might be more suitable to do it as a mandatory test suite using Cargo test tools rather than an optional step during the benchmarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep i thought the same thing and i moved it out of the benchmarks and aligned with @jayshrivastava 's PR
src/flight_service/do_get.rs
Outdated
| /*println!( | ||
| "{} Task {:?} executing partition {}", | ||
| stage.name(), | ||
| task.partition_group, | ||
| partition | ||
| );*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
| let (state, stage) = once_stage | ||
| .get_or_try_init(|| async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lock the once_stage RefMut across the initialization, locking a shard in the self.stages dashmap across an asynchronous gap, which might be too much locking.
Fortunately, it's very easy to prevent this:
- we can make the
OnceCella shared reference:
pub(super) stages: DashMap<StageKey, Arc<OnceCell<(SessionState, Arc<ExecutionStage>)>>>,- and then immediately drop the reference to the dashmap entry
let once_stage = {
let entry = self.stages.entry(key).or_default();
Arc::clone(&entry)
// <- dashmap RefMut get's dropped, releasing the lock for the current shard
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good improvement. added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file contained some tests that tested the behavior of sharing a single execution node across multiple callers with a dashmap. It's a shame to delete them, I would have expected them to still be valid here.
If you see no path forward in keeping those tests it's fine, we can build new ones eventually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added it back but i'm not sure of its function or necessity, as its not referenced anywhere.
src/plan/arrow_flight_read.rs
Outdated
| ))?; | ||
| stream_from_stage_task(ticket.clone(), &url, schema.clone(), &channel_manager).await | ||
| let futs = child_stage_tasks.iter().enumerate().map(|(i, task)| { | ||
| let i_capture = i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 i_capture? It should not be necessary to capture any variables that implement the Copy trait right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good eye and yes, not needed!
tests/distributed_aggregation.rs
Outdated
| assert_snapshot!(physical_distributed_str, | ||
| /*assert_snapshot!(physical_distributed_str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be commented? I think it should be fine to leave this uncommented, it should work.
FYI, this is using https://github.com/mitsuhiko/insta, which means that if the test fails, you can just do:
cargo insta review
And you will be prompted to accept the changes
You can install cargo insta with:
curl -LsSf https://insta.rs/install.sh | sh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, insta is awesome! 💯 Done.
This change adds a DashMap-like struct which has a background tasks to clean up entries that have outlived a configurable TTL. This struct is simliar to https://github.com/moka-rs/moka, which also uses time wheels. Having our own module avoids introducing a large dependency, which keeps this project closer to vanilla datafusion. This change is meant to be useful for #89, where it's possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries. Informs: #90
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome! 💯 in it goes
This change adds a DashMap-like struct which has a background tasks to clean up entries that have outlived a configurable TTL. This struct is simliar to https://github.com/moka-rs/moka, which also uses time wheels. Having our own module avoids introducing a large dependency, which keeps this project closer to vanilla datafusion. This change is meant to be useful for #89, where it's possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries. Informs: #90
Edit:
NestedLoopJoinExecis added.This PR fixes an execution logical bug and also adds a
--validateflag to the benchmarks to confirm that we calculate the correct result vs the single node case.To run the TPCH benchmark in a distributed fashion and validate it against single node execution, follow the readme in
benchmarks.Note that the particular approach to distributed execution that this library takes requires all joins to be partition joins. We do not support
CollectLeftin particular. So, the following modifications to the context before planning are required for correct operationconfig .options_mut() .optimizer .hash_join_single_partition_threshold = 0; config .options_mut() .optimizer .hash_join_single_partition_threshold_rows = 0; config.options_mut().optimizer.prefer_hash_join = true;At the moment this is set in the benchmark crate, but we really need to make this easy for the user and not allow them to mess up these values. I'm not sure how to do this at the moment. I think we can refactor subsequent to this PR.
Regarding adding support for other Hash join modes, I think we can do that, and then use the benchmark to compare and evaluate options for execution speed.